# Customize World This documentation overviews how to create a new world and relevant useful methods. Since **World** class wraps the simulator inside, please make sure the simulator has been installed successfully before creating your custom world. Here we would like to develop a new world, named `NewSim`, and wrap the new simulator `NewSim`. ## Create a New World ### Create a New World Class First, please create a new file `world_newsim.py` in the directory `LinSignal/world/` and write the following code into the file. ``` import newsim from common.registry import Registry @Registry.register_world('newsim') class World(object): def __init__(self, newsim_config, **kwargs): # section 1: initialize parameters and read traffic network from file # initialize parameters self.eng = None self.intersections = None self.id2intersection = None self.all_roads = None self.all_lanes = None self.fns = None self.info = None # read traffic network file code # section 2: create intersections, roads, lanes, roadlinks, lanelinks, etc. # code # section 3: define info_functions self.info_functions = { "vehicles": self.get_vehicles, "lane_count": self.get_lane_vehicle_count, "lane_waiting_count": self.get_lane_waiting_count, "lane_vehicles": self.get_lane_vehicles, "time": self.get_current_time, "vehicle_distance": self.get_vehicle_distance, "pressure": self.get_pressure, "lane_waiting_time_count": self.get_lane_waiting_time_count, "lane_delay": self.get_lane_delay, "real_delay": self.get_real_delay, "vehicle_trajectory": self.get_vehicle_trajectory, "history_vehicles": self.get_history_vehicles, "phase": self.get_cur_phase, "throughput": self.get_cur_throughput, "averate_travel_time": self.get_average_travel_time } def subscribe(self): pass def _update_infos(self): pass def step(self): pass def reset(self): pass ``` You can see that we first register the new world using `Registry`. Then we define `__init__()` and some methods that must be implemented. Section 1 of `__init__()` is to read information from the traffic network file and initialize parameters, including engine, intersections, roads, lanes and others, users could also customize the parameters they used later. Section 2 of `__init__()` is to create intersections, roads and other required parameters according to the road network file read from Section 1. Since the format of **Atomic Files** for different simulators is different, different simulators have different building methods. Section 3 of `__init__()`, we define `info_functions`, which aims to retrieve information from the environment and update information. In `info_functions`, the information and methods are mapped as key-value pairs. Users could also customize the information they want to get by constructing a key-value mapping between information names and methods. **Note**, the methods that appeared in the value list of `info_functions` must be implemented by calling interfaces of the simulator or user-defined methods. Then, we would introduce the methods that must be implemented when creating a new **World**. ### Implement subscribe() `subscribe` methods would subscribe information that users want to get. When initializing **Generator**s, **__init__()** methods would call `subscribe` methods, so that the **Generator** class would call `generate()` method to obtain the corresponding information after each episode or each step. Note, the name of subscribed information must appear in the value list of `info_functions`. You can define `subscribe()` like this: ``` @Registry.register_world('newsim') class World(object): def subscribe(self, fns): if isinstance(fns, str): fns = [fns] for fn in fns: if fn in self.info_functions: if not fn in self.fns: self.fns.append(fn) else: raise Exception("info function %s not exists" % fn) ``` ### Implement \_update\_infos() `_update_infos` method would update the global world's information after reset or each step. You can define `_update_infos()` like this: ``` @Registry.register_world('newsim') class World(object): def _update_infos(self): self.info = {} for fn in self.fns: self.info[fn] = self.info_functions[fn]() ``` ### Implement step() To make **Agent**s interact with **World**, we should implement `step()` method. It takes actions generated by **Agent** as input, performs the actions in units of intersections, and then updates the information, including global information, measurements and trajectory, etc. You can define `step()` like this: ``` @Registry.register_world('newsim') class World(object): def step(self, actions=None): # section 1: take action to the intersections if action is not None: for i, intersection in enumerate(self.intersections): intersection.step(action[i]) self.eng.next_step() # section 2: update information self._update_infos() # section 3: update other information # code ``` ### Implement reset() Here, we take CityFlow as an example, you can define `reset()` like this: ``` @Registry.register_world('newsim') class World(object): def reset(self): # section 1: reconnect engine, in CityFlow: self.eng.reset() # section 2: reset intersections' information for I in self.intersections: I.reset() # section 3: reset related information self._update_infos() # section 4: reset other information # code ``` ## Create a New Intersection ### Create a New Intersection Class First, please write the following code in the file `LinSignal/world/world_newsim.py`: ``` class Intersection(object): def __init__(self, intersection, world): # section 1: initialize parameters self.id = intersection["id"] self.eng = world.eng self.roads = [] self.outs = [] self.directions = [] self.out_roads = None self.in_roads = None # section 2: set available phases and other related parameters # code def _change_phase(self): pass def step(self): pass def reset(self): pass def _get_direction(self): pass def sort_roads(self): pass ``` You can see that in `__init__()` method, we take the world and intersection information as input. We define and initialize parameters, including the world this intersection belongs to, the roads and lanes, etc. Then, there exist different methods to set available phases and other related parameters for different parameters. Finally, the methods that are essential to **Intersection** class are listed. Section 1 of `__init__()` is to define and initialize parameters, including the world this intersection belongs to, the roads and lanes, etc. Section 2 of `__init__()` is to set available phases and other related parameters. There are different methods to implement this part because of the differences in the internal implementation of different simulators. Then, we would introduce the methods that must be implemented when creating a new **Intersection**. ### Implement \_change\_phase() For an intersection, it is needed to implement a method for switching traffic phases. `_change_phase()` provides an interface to change the current phase into the next phase by interacting with simulators. In `_change_phase()` method, `phase` in input parameters is the traffic light planned to change to. Take CityFlow as an example, you can define `_change_phase()` like this: ``` class Intersection(object): def _change_phase(self, phase): self.eng.set_tl_phase(self.id, phase) self._current_phase = phase ``` ### Implement step() `step()` method makes the intersection execute the action according to the interval, then updates the related information, including current_phase, current_phase_time, etc. Take CityFlow as an example. You can define `step()` like this: ``` class Intersection(object): def step(self): # if current phase is yellow, then continue to finish the yellow phase if self._current_phase in self.yellow_phase_id: if self.current_phase_time == self.yellow_phase_time: self._change_phase(self.phases[self.action_before_yellow], interval) self.current_phase = self.action_before_yellow self.action_executed = self.action_before_yellow else: self.current_phase_time += interval # if current phase is not yellow, then change the phase else: # if the phase planned to change to is the same as current phase, then just add the time of this phase if action == self.current_phase: self.current_phase_time += interval else: # if there exist yellow light behind each green light, then the next phase should be yellow phase as a transitional if self.yellow_phase_time > 0: # in SUMO, yellow(red) phase is arranged behind each green light if self.if_sumo: assert (self._current_phase+1)%len(self.all_phases) in self.yellow_phase_id self._change_phase((self._current_phase+1)%len(self.all_phases), interval) # in CityFlow, yellow(red) phase is fixed in the first location in phase list else: self._change_phase(self.yellow_phase_id[0], interval) self.action_before_yellow = action else: self._change_phase(self.phases[action], interval) self.current_phase = action self.action_executed = action ``` ### Implement reset() `reset` method is to reset current_phase, action_before_yellow and action_executed, etc. By default, the first phase after resetting the environment is the one in the first position in the phase list. Take CityFlow as an example. You can define `step()` like this: ``` class Intersection(object): def reset(self): # section 1: set current phase id to 0, and take current phase into the engine self.current_phase = 0 if len(self.phases) == 0: self._current_phase = 0 else: self._current_phase = self.phases[0] # true phase id (including yellow) self.eng.set_tl_phase(self.id, self._current_phase) # section 2: reset other informations self.current_phase_time = 0 self.action_before_yellow = None self.action_executed = None ``` Section 1 of `reset()` is to set the current phase to the one that occupies the first position in the phase list. Section 2 of `reset()` is to reset related information, including current phase time and others. ### Implement \_get\_direction() `_get_direction()` is to calculate the angle of the road for sorting roads later. The default order of roads is N-E-S-W. There are different calculation methods for different simulators. Take CityFlow as an example, the code for getting the direction of a road is as follows: ``` class Intersection(object): def _get_direction(self, road, out=True): if out: x = road[1][0] - road[0][0] y = road[1][1] - road[0][1] else: x = road[-2][0] - road[-1][0] y = road[-2][1] - road[-1][1] tmp = atan2(x, y) return tmp if tmp >= 0 else (tmp + 2 * pi) ``` ### Implement sort\_roads() `sort_road()` is to sort roads, including in roads, out roads and others, by a specific order. Take CityFlow as an example, the code for sorting is as follows: ``` class Intersection(object): def sort_roads(self): order = sorted(range(len(self.roads)), key=lambda i: (self.directions[i], self.outs[i] if self.world.RIGHT else not self.outs[i])) self.roads = [self.roads[i] for i in order] self.directions = [self.directions[i] for i in order] self.outs = [self.outs[i] for i in order] self.out_roads = [self.roads[i] for i, x in enumerate(self.outs) if x] self.in_roads = [self.roads[i] for i, x in enumerate(self.outs) if not x] ```
Now that you have learned how to add a new world, try the following commands to use this world! ``` python run.py -w newsim ```